﻿#ifndef __THREADFACTORY_H__
#define __THREADFACTORY_H__

#include "queueobject.h"
#include "anyval.h"
/************************************************************************************
* 线程工厂
* 用户自定义线程实现函数，对压入的任务进行加工处理
************************************************************************************/
using namespace std::chrono_literals;
class CThreadFactoryBase
{
public:
	virtual ~CThreadFactoryBase() {};
	virtual bool IsStop() = 0;
	virtual bool Pop(AnyVar& val, std::chrono::milliseconds waitTime = -1ms) = 0;
};

struct TaskModel {};
struct DataModel {};

template<typename T, typename P>
class __ThreadFactory;

template<typename T>
class __ThreadFactory<T, typename std::enable_if<std::is_same<typename T::ModelType, TaskModel>::value, TaskModel>::type>
	: public ThreadObject, public CThreadFactoryBase
{
	using ValType = typename T::ValueType;
	using ValuePtrType = typename T::ValuePtrType;
	using RetType = typename T::RetType;
	using RecvType = typename T::RecvType;
public:
	//向队列中压入任务
	auto Push(typename T::RecvType val) ->std::future<RetType>
	{
		ValuePtrType task = std::make_shared<ValType>(val);
		std::future<RetType> future = task->get_future();
		{
			std::unique_lock<std::mutex> lock(m_mu);

			//不允许向已停止的线程池提交作业
			if (m_bStop)
				throw std::runtime_error("向已停止的线程工厂提交作业");

			while (m_taskQueue.size() == m_nCapacity)         //队列已满
			{
				m_condPush.wait(m_mu);                         //等待，将暂时的解锁
			}

			m_taskQueue.push(task);
		}


		m_condPop.notify_one(); // 唤醒一个线程执行
		return future;
	}
protected:
	std::queue<ValuePtrType> m_taskQueue;                    //队列
};

template <typename T, typename RET>
struct DataNode
{
	T data;
	std::promise<RET> res;
	DataNode(const T& d, std::promise<RET>& prs) :data(d), res(std::move(prs)) {}
};

template<typename T>
class __ThreadFactory<T, typename std::enable_if<std::is_same<typename T::ModelType, DataModel>::value, DataModel>::type>
	: public ThreadObject, public CThreadFactoryBase
{
	using ValType = typename T::ValueType;
	using ValuePtrType = typename T::ValuePtrType;
	using RetType = typename T::RetType;
	using RecvType = typename T::RecvType;
public:
	//向队列中压入任务
	std::future<RetType> Push(const RecvType& val)
	{
		std::promise<RetType> prs;
		auto future = prs.get_future();
		ValuePtrType tupVal = std::make_shared<ValType>(val, prs);
		{
			std::unique_lock<std::mutex> lock(m_mu);

			//不允许向已停止的线程池提交作业
			if (m_bStop)
				throw std::runtime_error("向已停止的线程工厂提交作业");

			while (m_taskQueue.size() == m_nCapacity)         //队列已满
			{
				m_condPush.wait(m_mu);                         //等待，将暂时的解锁
			}

			m_taskQueue.push(tupVal);
		}


		m_condPop.notify_one(); // 唤醒一个线程执行
		return future;
	}
protected:
	std::queue<ValuePtrType> m_taskQueue;                    //队列
};


template<typename T>
class CThreadFactory : public __ThreadFactory<T, typename T::ModelType>
{
public:
	~CThreadFactory()
	{
		__super::Stop();
		for (std::thread& thread : __super::m_pThreadPool) {
			if (thread.joinable())
				thread.join(); // 等待任务结束， 前提：线程一定会执行完
		}
		__super::m_pThreadPool.clear();
	}

	template<typename ...P>
	void Start(unsigned short nThreadNum = 1, P&&... p)
	{
		for (size_t i = 0; i < nThreadNum; i++)
		{
			__super::m_pThreadPool.emplace_back([this, p...]() {
				T obj(*this, p...);
				obj.Run();
			});
		}
	}

	//从任务队列中获取一个任务
	//waitTime :为等待时间，单位（秒)
	bool Pop(AnyVar& task, std::chrono::milliseconds waitTime = -1)
	{
		ThreadObject::m_idlThrNum++;
		{
			std::unique_lock<std::mutex> lock(ThreadObject::m_mu);
			if (waitTime.count() < 0)
			{
				this->m_condPop.wait(lock,
					[this] {
					return this->m_bStop.load() || !this->m_taskQueue.empty();
				}); // wait 直到有 task
			}
			else
			{
				
				auto status = __super::m_condPop.wait_for(lock, waitTime, [this] {
					return  this->m_bStop.load() || !this->m_taskQueue.empty();
				});
				if (!status)
				{
					__super::m_idlThrNum--;
					return false;
				}
			}

			if (this->m_taskQueue.empty())
			{
				if (this->m_bStop)
				{
					__super::m_idlThrNum--;
				}
				return false;
			}

			task = std::move(this->m_taskQueue.front()); // 取一个 task
			this->m_taskQueue.pop();

		}
		//通知写线程
		__super::m_condPush.notify_one();
		__super::m_idlThrNum--;
		return true;
	}

	virtual size_t GetTaskNum()
	{
		return __super::m_taskQueue.size();
	}
	virtual bool IsStop()
	{
		return ThreadObject::IsStop();
	}

};

//基于任务的线程接口
template<typename RET, typename... Args>
class BaseOnTask
{
public:
	using ModelType = TaskModel;
	using ValueType = std::packaged_task<RET(Args ...)>;
	using ValuePtrType = std::shared_ptr<std::packaged_task<RET(Args ...)>>;
	using RecvType = std::function<RET(Args ...)>;
	using RetType = RET;
};

template<typename RET, typename... Args>
class BaseOnData
{
public:
	using ModelType = DataModel;
	using ValueType = DataNode<std::tuple<Args ...>, RET>;
	using ValuePtrType = std::shared_ptr<DataNode<std::tuple<Args ...>, RET>>;
	using RecvType = std::tuple<Args...>;
	using RetType = RET;
};


//任务线程接口
template<typename T>
class ThreadJob : public T
{
public:
	using ValType = typename T::ValueType;
	using ValuePtrType = typename T::ValuePtrType;
	using RetType = typename T::RetType;
	using ModelType = typename T::ModelType;
	using RecvType = typename T::RecvType;

	ThreadJob(CThreadFactoryBase& factory) :m_factory(factory) {}
	
	bool GetJob(ValuePtrType& task, std::chrono::milliseconds waitTime = -1ms)
	{
		AnyVar val;
		if (m_factory.Pop(val, waitTime))
		{
			task = any_cast<ValuePtrType>(val);
			return true;
		}
		return false;
	}

	bool IsStop()
	{
		return m_factory.IsStop();
	}
	//线程启动函数
	virtual void Run() = 0;
private:
	CThreadFactoryBase& m_factory;
};


template<typename RET, typename... Args>
class BaseTaskThread : public ThreadJob< BaseOnTask<RET, Args...>>
{
public:
	using ThreadJob< BaseOnTask<RET, Args...>>::ThreadJob;
};

template<typename RET, typename... Args>
class BaseDataThread : public ThreadJob< BaseOnData<RET, Args...>>
{
public:
	using ThreadJob< BaseOnData<RET, Args...>>::ThreadJob;
};
#endif //__THREADFACTORY_H__